-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect API #41843
Conversation
* @throws IllegalArgumentException if t is null | ||
*/ | ||
def convertJavaTimestampToTimestamp(t: Timestamp): Long = { | ||
require(t != null, "timestamp must be non-null") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an user-facing error? If so, please, add an error class to error-classes.json
. See for instance:
throw QueryCompilationErrors.unsupportedTableChangeInJDBCCatalogError(change) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not an user facing error but is a precondition: the timestamp passed into this function will never be null. I removed the null check because a similar function convertJavaTimestampToTimestampNTZ
in JdbcDialects.scala does not have the check either.
cc @sadikovi FYI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Can we provide more information about the overflow issue?IIRC, Spark doesn't accommodate infinite timestamps. It may be more appropriate to fail on overflow or unsupported values instead of converting them to maximum values. |
* PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. | ||
* If it is not one of the infinity values, fall back to default behavior. */ | ||
override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no new line is needed.
* If it is not one of the infinity values, fall back to default behavior. */ | ||
override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { | ||
|
||
// variable names come from PostgreSQL "constant field docs": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please update to this: // Variable names come from ...
@mingkangli-db Please update the title of the PR. It is not [CORE] but [SQL]. |
@yaooqinn Additional information is provided in the PR description: if Postgres table contains infinity timestamps, users cannot read such tables in Spark. I don't think failing such queries is an option - they already fail even without this change, albeit with an overflow error. Postgres driver handles infinity values as mentioned in the PR, Spark simply fails to convert them. This PR adds the ability to bypass Spark conversion for such values. org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(-9223372036832400000L)) would fail with
IMHO, it could be beneficial to refactor code and add a special conversion function since other JDBC dialects could potentially require timestamp handling, like in this case. Also, this infinity timestamp fix is only for Postgres, not all dialects. |
|
||
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || | ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { | ||
Long.MaxValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we query a infinite timestamp column in pgsql, what does pgsql display?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be displayed as "+infinity" or "-infinity". See: here "The values infinity and -infinity are specially represented inside the system and will be displayed unchanged"
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L | ||
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L | ||
|
||
val time = t.getTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the range of java Timestamp
is larger than Spark SQL timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the Spark SQL timestamp range?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From Long.MinValue microseconds before UTC epoch to Long.MaxValue microseconds after UTC epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it is the same range for java Timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then how does the overflow happen? because the calendar is different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that java.sql timestamps are measured with millisecond accuracy (from Long.MinValue milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps are measured at microseconds accuracy. So we would get an overflow exception when we call MultiplyExact by 1000 in Java.
The stacktrace would look something like this:
at java.lang.Math.multiplyExact(Math.java:892)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestampNoRebase(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$15(JdbcUtils.scala:xxx)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simplify, Long.MaxValue should be the min value in microseconds to not overflow, is that right @mingkangli-db?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if you mean Long.MaxValue itself is the maximum value in microseconds that can be stored to not cause overflow. I added some comments in PostgresDialect.scala, hopefully this would make it clearer.
@@ -432,4 +437,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { | |||
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa")) | |||
assert(row(0).getString(1) == "fdasfasdf") | |||
} | |||
|
|||
test("SPARK-44280: infinity timestamp test") { | |||
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also use the write API to play a roundtrip here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal here is upon reading one of the infinity timestamps in Postgresql, it is cast into reasonable values in Spark SQL instead of throwing an overflow error. However, from the other direction, since there is no built-in "infinity" values in Spark SQL, we can't write an infinity timestamp value to Postgresql, so I don't think a roundtrip test would be possible here.
thanks, merging to master/3.5 (fix pgsql dialect)! |
… API ### What changes were proposed in this pull request? This PR fixes an overflow issue upon reading an "infinity" timestamp from PostgreSQL. It addresses the issue by adding a new function, convertJavaTimestampToTimestamp, to the JDBCDialects API, and overrides it in PostgresDialect.scala by clamping the special "infinity" timestamps to long.max and long.min. ### Why are the changes needed? The pre-existing default behavior of timestamp conversion potentially triggers an overflow due to these special values (i.e. The executor would crash if you select a column that contains infinity timestamps in PostgreSQL.) By integrating this new function, we can mitigate such issues, enabling more versatile and robust timestamp value conversions across various JDBC-based connectors. ### Does this PR introduce _any_ user-facing change? A new function, convertJavaTimestampToTimestamp, is added to the JDBCDialects API to allow JDBC dialects to override the default behavior of converting Java timestamps. ### How was this patch tested? An integration test was added in PostgresIntegrationSuite.scala to verify it can handle +infinity and -infinity timestamps in PostgreSQL. Closes #41843 from mingkangli-db/SPARK-44280. Authored-by: Mingkang Li <mingkang.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit e91c024) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… API ### What changes were proposed in this pull request? This PR fixes an overflow issue upon reading an "infinity" timestamp from PostgreSQL. It addresses the issue by adding a new function, convertJavaTimestampToTimestamp, to the JDBCDialects API, and overrides it in PostgresDialect.scala by clamping the special "infinity" timestamps to long.max and long.min. ### Why are the changes needed? The pre-existing default behavior of timestamp conversion potentially triggers an overflow due to these special values (i.e. The executor would crash if you select a column that contains infinity timestamps in PostgreSQL.) By integrating this new function, we can mitigate such issues, enabling more versatile and robust timestamp value conversions across various JDBC-based connectors. ### Does this PR introduce _any_ user-facing change? A new function, convertJavaTimestampToTimestamp, is added to the JDBCDialects API to allow JDBC dialects to override the default behavior of converting Java timestamps. ### How was this patch tested? An integration test was added in PostgresIntegrationSuite.scala to verify it can handle +infinity and -infinity timestamps in PostgreSQL. Closes apache#41843 from mingkangli-db/SPARK-44280. Authored-by: Mingkang Li <mingkang.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) | ||
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) | ||
|
||
assert(infinity.getTime == maxTimestamp) | ||
assert(negativeInfinity.getTime == minTimeStamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ts.getTime()' returns epoch in milliseconds, and
LocalDateTime.toEpochSecond()` returns values in seconds.
Shouldn't this be
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000
Otherwise, convertJavaTimestampToTimestamp()
does not return infinites as expected
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) | ||
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) | ||
|
||
val time = t.getTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yaooqinn, @mingkangli-db, @cloud-fan
ts.getTime()
returns epoch in milliseconds, new Timestamp(ts)
takes time in milliseconds , and LocalDateTime.toEpochSecond()
returns values in seconds.
Shouldn't this be
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000
e.g. * 1000
Otherwise:
val tsLong = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)
val ts = new Timestamp(tsLong)
gives:
tsLong: Long = 253402300799
ts: java.sql.Timestamp = 1978-01-11 22:31:40.799
not 9999-12-31
and time
is used later to create a new instance of Timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! We shall fix this...
…ITY timestamps ### What changes were proposed in this pull request? This PR fixes a bug involved with #41843 that Epoch Second is used instead of epoch millis to create a timestamp value ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? revised tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45599 from yaooqinn/SPARK-47473. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
…ITY timestamps This PR fixes a bug involved with #41843 that Epoch Second is used instead of epoch millis to create a timestamp value bugfix no revised tests no Closes #45599 from yaooqinn/SPARK-47473. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit ad8ac17) Signed-off-by: Kent Yao <yao@apache.org>
…estampToTimestamp for JdbcDialect ### What changes were proposed in this pull request? Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect ### Why are the changes needed? The date '±infinity' values cause overflows like timestamp '±infinity' in #41843 ### Does this PR introduce _any_ user-facing change? fix expected overflow for dates to align with the timestamps of PostgreSQL ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45638 from yaooqinn/SPARK-47501. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ITY timestamps ### What changes were proposed in this pull request? This PR fixes a bug involved with apache#41843 that Epoch Second is used instead of epoch millis to create a timestamp value ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? revised tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#45599 from yaooqinn/SPARK-47473. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
…estampToTimestamp for JdbcDialect ### What changes were proposed in this pull request? Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect ### Why are the changes needed? The date '±infinity' values cause overflows like timestamp '±infinity' in apache#41843 ### Does this PR introduce _any_ user-facing change? fix expected overflow for dates to align with the timestamps of PostgreSQL ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#45638 from yaooqinn/SPARK-47501. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This PR fixes an overflow issue upon reading an "infinity" timestamp from PostgreSQL. It addresses the issue by adding a new function, convertJavaTimestampToTimestamp, to the JDBCDialects API, and overrides it in PostgresDialect.scala by clamping the special "infinity" timestamps to long.max and long.min.
Why are the changes needed?
The pre-existing default behavior of timestamp conversion potentially triggers an overflow due to these special values (i.e. The executor would crash if you select a column that contains infinity timestamps in PostgreSQL.) By integrating this new function, we can mitigate such issues, enabling more versatile and robust timestamp value conversions across various JDBC-based connectors.
Does this PR introduce any user-facing change?
A new function, convertJavaTimestampToTimestamp, is added to the JDBCDialects API to allow JDBC dialects to override the default behavior of converting Java timestamps.
How was this patch tested?
An integration test was added in PostgresIntegrationSuite.scala to verify it can handle +infinity and -infinity timestamps in PostgreSQL.